/*********************************************************************************
 *      Copyright:  (C) 2014 EAST
 *                  All rights reserved.
 *
 *       Filename:  msgque.c
 *    Description:  This file is used to deal message queue. 
 *                 
 *        Version:  1.0.0(07/29/2014)
 *         Author:  fulinux <fulinux@sina.com>
 *      ChangeLog:  1, Release initial version on "07/29/2014 08:30:29 AM"
 *                 
 ********************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/time.h>

#include "msgque.h"

msgque_t *msgque_init (int fid, mode_t mode)
{
    char fifo_name[30];

    msgque_t *pmsgque;

    if(fid == -1){
        errno = EINVAL;
        return NULL;
    }

    pmsgque = (msgque_t *)malloc(sizeof(msgque_t));
    pmsgque->msgbuf = (msgbuf_t *)malloc(sizeof(msgbuf_t));

    sprintf(fifo_name, "/tmp/msgque-%d", fid);

    if(access(fifo_name, F_OK) == -1){
        if(mkfifo(fifo_name, 0777) != 0){
            perror("MSGQUE MKFIFO");
            free(pmsgque->msgbuf->mtext);
            free(pmsgque);
            return NULL;
        }
    }  

    pmsgque->fd = open(fifo_name, mode);
    if(pmsgque->fd == -1){
        perror("MSGQUE OPEN FIFO");
        free(pmsgque->msgbuf->mtext);
        free(pmsgque);
        return NULL;
    }

    pmsgque->key = ftok(fifo_name, fid);
    if(-1 == pmsgque->key){
        perror("MSGQUE FTOK");
        if(pmsgque->fd == -1)
            close(pmsgque->fd);
        free(pmsgque->msgbuf->mtext);
        free(pmsgque);
        return NULL;
    }

    pmsgque->msgid = msgget(pmsgque->key, IPC_EXCL); /* 检查消息队列是否存在 */
    if ( pmsgque->msgid < 0 ){
        pmsgque->msgid = msgget(pmsgque->key, IPC_CREAT | 0666); /* 创建消息队列 */
        if (pmsgque->msgid < 0 ){
            perror("MSGGET CREAT");
            free(pmsgque->msgbuf->mtext);
            free(pmsgque);
            return NULL;
        }
    }

    /* 清空消息队列中的恶历史数据. */
    while(-1 != msgrcv(pmsgque->msgid, (void *)pmsgque->msgbuf, MSGSIZE, 0, IPC_NOWAIT));

    return pmsgque;
} /* ----- End of msgque_init() ----- */

int msgque_setdebug(msgque_t *msgque, int flag)
{
    msgque_t *pmsgque = msgque;

    pmsgque->debug = flag;

    return 0;
}

int msgque_read (msgque_t *msgque, int msgflg)
{
    int i;
    msgque_t *pmsgque = msgque;

    if(pmsgque == NULL){
        errno = EINVAL;
        return -1;
    }

    pmsgque->msgid = msgget(pmsgque->key, IPC_EXCL); /* 检查消息队列是否存在 */
    if ( pmsgque->msgid < 0 ){
        pmsgque->msgid = msgget(pmsgque->key, IPC_CREAT | 0666); /* 创建消息队列 */
        if (pmsgque->msgid < 0 ){
            perror("MSGGET CREAT");
            free(pmsgque->msgbuf->mtext);
            free(pmsgque);
            return -1;
        }
    }

    /* 接收消息队列 */
    if(-1 == msgrcv(pmsgque->msgid, (void *)pmsgque->msgbuf, MSGSIZE, 0, msgflg)){
        if(pmsgque->debug)
            fprintf(stderr, "MSGRCV: %s\n", strerror(errno));
        return -1;
    }

    if(pmsgque->debug){
        printf ("read mtype: %ld\n", pmsgque->msgbuf->mtype);
        for(i = 0; i < MSGSIZE; i++){
            printf ("%d ", pmsgque->msgbuf->mtext[i]);
        }
        printf ("\n");
    }

    return 0;
} /* ----- End of msgque_read()  ----- */

int msgque_read_timeout(msgque_t *msgque, struct timeval *timeout)
{
    int ret;
    char buf[5];
    fd_set rfds;
    msgque_t *pmsgque = msgque;

    FD_ZERO(&rfds);
    FD_SET(pmsgque->fd, &rfds);

    ret = select(pmsgque->fd + 1, &rfds, NULL, NULL, timeout);
    if(ret > 0 && FD_ISSET(pmsgque->fd, &rfds)){
        if(-1 == msgque_read(pmsgque, IPC_NOWAIT)){
        }

        if(-1 == read(pmsgque->fd, buf, 5)){
            perror("READ");
            return -1;
        }

        return 0;
    }

    if(ret == 0)
        errno = ETIMEDOUT;
    return -1;
}

int msgque_write (msgque_t *msgque, int msgflg)
{
    int i;
    msgque_t *pmsgque = msgque;

    if(pmsgque == NULL){
        errno = EINVAL;
        return -1;
    }

    pmsgque->msgid = msgget(pmsgque->key, IPC_EXCL); /* 检查消息队列是否存在 */
    if ( pmsgque->msgid < 0 ){
        pmsgque->msgid = msgget(pmsgque->key, IPC_CREAT | 0666); /* 创建消息队列 */
        if (pmsgque->msgid < 0 ){
            perror("MSGGET CREAT");
            free(pmsgque->msgbuf->mtext);
            free(pmsgque);
            return -1;
        }
    }

    if(pmsgque->fd != -1){
        if(-1 == write(pmsgque->fd, "fifo", 5)){
            perror("WRITE");
            return -1;
        }
    }

    /* 发送消息队列 */
    if(-1 == msgsnd(pmsgque->msgid, (void *)pmsgque->msgbuf, MSGSIZE, msgflg)){
        if(pmsgque->debug)
            fprintf(stderr, "MSGSND: %s\n", strerror(errno));
        return -1;
    }

    if(pmsgque->debug){

        printf ("write mtype: %ld\n", pmsgque->msgbuf->mtype);
        for(i = 0; i < MSGSIZE; i++){
            printf ("%d ", pmsgque->msgbuf->mtext[i]);
        }
        printf ("\n");
    }

    return 0;
} /* ----- End of msgque_write()  ----- */

int msgque_exit (msgque_t *msgque)
{
    msgque_t *pmsgque = msgque;

    if(NULL == pmsgque->msgbuf->mtext)
        free(pmsgque->msgbuf->mtext);

    if(NULL == pmsgque)
        free(pmsgque);

    /* 删除消息队列 */
    pmsgque->msgid = msgget(pmsgque->key, IPC_EXCL); /* 检查消息队列是否存在 */
    if ( pmsgque->msgid < 0 ){
        if (msgctl(pmsgque->msgid, IPC_RMID, 0) == -1){
            if(pmsgque->debug)
                fprintf(stderr, "MSGQUE EXIT: %s\n", strerror(errno));
            return -1;
        }
    }

    return 0;
} /* ----- End of msgque_exit()  ----- */

